package com.xueyuan.wata.daph.spark3.api.node.dataframe

import com.xueyuan.wata.daph.spark3.api.node.SparkNode
import com.xueyuan.wata.daph.utils.SQLUtil
import org.apache.spark.sql.DataFrame

trait DataFrameNode extends SparkNode {
  protected def executeSQL(sqlType: String, df: DataFrame): DataFrame = {
    val haveSQL = nodeDescription.extraOptions.contains(sqlType)

    val res = if (haveSQL) {
      val sql = nodeDescription.extraOptions(sqlType)
      val tn = SQLUtil.getTableNames(sql).head
      df.createOrReplaceTempView(tn)
      spark.sql(sql)
    } else df
    res
  }
}
